iT邦幫忙

2025 iThome 鐵人賽

DAY 19
0

Day 19: 資料分析助手實作

今天我們要打造一個智能資料分析助手!結合 AI 的理解能力和數據處理工具,讓使用者能夠用自然語言進行數據探索、分析和視覺化。

📊 為什麼需要 AI 資料分析助手?

傳統的資料分析需要專業知識和編程技能,AI 助手可以:

  • 🗣️ 自然語言查詢:用日常語言提問數據問題
  • 📈 自動化分析:智能選擇合適的分析方法
  • 🎨 智能視覺化:自動生成適合的圖表
  • 💡 洞察發現:主動發現數據中的趨勢和異常
  • 📝 報告生成:自動產生分析報告

🏗 專案結構

data_analysis_assistant/
├── main.py                          # 主程式
├── core/
│   ├── __init__.py
│   ├── analysis_engine.py           # 分析引擎
│   └── data_processor.py            # 資料處理器
├── analyzers/
│   ├── __init__.py
│   ├── statistical_analyzer.py      # 統計分析器
│   ├── trend_analyzer.py            # 趨勢分析器
│   └── correlation_analyzer.py      # 相關性分析器
├── visualizers/
│   ├── __init__.py
│   └── chart_generator.py           # 圖表生成器
└── utils/
    ├── __init__.py
    ├── data_loader.py               # 資料載入器
    └── report_generator.py          # 報告生成器

🔧 核心實作

1. 資料處理器 (core/data_processor.py)

import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional

class DataProcessor:
    """資料處理器"""
    
    def __init__(self):
        self.data = None
        self.data_info = {}
    
    def load_data(self, data: pd.DataFrame) -> Dict[str, Any]:
        """載入資料"""
        self.data = data
        
        # 收集資料資訊
        self.data_info = {
            'rows': len(data),
            'columns': len(data.columns),
            'column_names': list(data.columns),
            'dtypes': data.dtypes.to_dict(),
            'missing_values': data.isnull().sum().to_dict(),
            'numeric_columns': list(data.select_dtypes(include=[np.number]).columns),
            'categorical_columns': list(data.select_dtypes(include=['object']).columns)
        }
        
        print(f"✅ 資料已載入:{self.data_info['rows']} 行 × {self.data_info['columns']} 列")
        
        return self.data_info
    
    def get_basic_stats(self, column: str = None) -> Dict[str, Any]:
        """獲取基本統計資訊"""
        if self.data is None:
            return {"error": "尚未載入資料"}
        
        if column:
            if column not in self.data.columns:
                return {"error": f"欄位 '{column}' 不存在"}
            
            col_data = self.data[column]
            
            if pd.api.types.is_numeric_dtype(col_data):
                return {
                    'column': column,
                    'type': 'numeric',
                    'count': int(col_data.count()),
                    'mean': float(col_data.mean()),
                    'median': float(col_data.median()),
                    'std': float(col_data.std()),
                    'min': float(col_data.min()),
                    'max': float(col_data.max()),
                    'missing': int(col_data.isnull().sum())
                }
            else:
                value_counts = col_data.value_counts()
                return {
                    'column': column,
                    'type': 'categorical',
                    'count': int(col_data.count()),
                    'unique': int(col_data.nunique()),
                    'top_value': str(value_counts.index[0]) if len(value_counts) > 0 else None,
                    'top_freq': int(value_counts.iloc[0]) if len(value_counts) > 0 else 0,
                    'missing': int(col_data.isnull().sum())
                }
        else:
            # 整體統計
            return {
                'numeric_summary': self.data.describe().to_dict(),
                'missing_summary': self.data.isnull().sum().to_dict()
            }
    
    def filter_data(self, conditions: Dict[str, Any]) -> pd.DataFrame:
        """過濾資料"""
        if self.data is None:
            return pd.DataFrame()
        
        filtered = self.data.copy()
        
        for column, condition in conditions.items():
            if column not in filtered.columns:
                continue
            
            operator = condition.get('operator', '==')
            value = condition.get('value')
            
            if operator == '==':
                filtered = filtered[filtered[column] == value]
            elif operator == '>':
                filtered = filtered[filtered[column] > value]
            elif operator == '<':
                filtered = filtered[filtered[column] < value]
            elif operator == '>=':
                filtered = filtered[filtered[column] >= value]
            elif operator == '<=':
                filtered = filtered[filtered[column] <= value]
            elif operator == 'contains':
                filtered = filtered[filtered[column].astype(str).str.contains(str(value), na=False)]
        
        return filtered
    
    def group_analysis(self, group_by: str, agg_column: str, 
                      agg_func: str = 'mean') -> Dict[str, Any]:
        """分組分析"""
        if self.data is None:
            return {"error": "尚未載入資料"}
        
        if group_by not in self.data.columns or agg_column not in self.data.columns:
            return {"error": "欄位不存在"}
        
        try:
            result = self.data.groupby(group_by)[agg_column].agg(agg_func)
            
            return {
                'group_by': group_by,
                'agg_column': agg_column,
                'agg_func': agg_func,
                'results': result.to_dict()
            }
        except Exception as e:
            return {"error": str(e)}

2. 統計分析器 (analyzers/statistical_analyzer.py)

import pandas as pd
import numpy as np
from scipy import stats
from typing import Dict, Any, List

class StatisticalAnalyzer:
    """統計分析器"""
    
    def analyze_distribution(self, data: pd.Series) -> Dict[str, Any]:
        """分析資料分佈"""
        if not pd.api.types.is_numeric_dtype(data):
            return {"error": "只能分析數值型資料"}
        
        # 基本統計
        analysis = {
            'mean': float(data.mean()),
            'median': float(data.median()),
            'mode': float(data.mode().iloc[0]) if len(data.mode()) > 0 else None,
            'std': float(data.std()),
            'variance': float(data.var()),
            'skewness': float(data.skew()),
            'kurtosis': float(data.kurtosis())
        }
        
        # 四分位數
        analysis['quartiles'] = {
            'Q1': float(data.quantile(0.25)),
            'Q2': float(data.quantile(0.5)),
            'Q3': float(data.quantile(0.75)),
            'IQR': float(data.quantile(0.75) - data.quantile(0.25))
        }
        
        # 異常值檢測
        Q1 = data.quantile(0.25)
        Q3 = data.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = data[(data < lower_bound) | (data > upper_bound)]
        analysis['outliers'] = {
            'count': int(len(outliers)),
            'percentage': float(len(outliers) / len(data) * 100),
            'values': outliers.tolist()[:10]  # 最多顯示 10 個
        }
        
        return analysis
    
    def correlation_analysis(self, df: pd.DataFrame, 
                           columns: List[str] = None) -> Dict[str, Any]:
        """相關性分析"""
        numeric_df = df.select_dtypes(include=[np.number])
        
        if columns:
            numeric_df = numeric_df[columns]
        
        if len(numeric_df.columns) < 2:
            return {"error": "需要至少兩個數值欄位"}
        
        # 計算相關係數矩陣
        corr_matrix = numeric_df.corr()
        
        # 找出強相關的配對
        strong_correlations = []
        for i in range(len(corr_matrix.columns)):
            for j in range(i+1, len(corr_matrix.columns)):
                corr_value = corr_matrix.iloc[i, j]
                if abs(corr_value) > 0.5:  # 相關係數 > 0.5
                    strong_correlations.append({
                        'column1': corr_matrix.columns[i],
                        'column2': corr_matrix.columns[j],
                        'correlation': float(corr_value),
                        'strength': 'strong' if abs(corr_value) > 0.7 else 'moderate'
                    })
        
        return {
            'correlation_matrix': corr_matrix.to_dict(),
            'strong_correlations': sorted(
                strong_correlations, 
                key=lambda x: abs(x['correlation']), 
                reverse=True
            )
        }
    
    def hypothesis_test(self, data1: pd.Series, data2: pd.Series,
                       test_type: str = 't-test') -> Dict[str, Any]:
        """假設檢定"""
        if test_type == 't-test':
            # 雙樣本 t 檢定
            statistic, p_value = stats.ttest_ind(data1.dropna(), data2.dropna())
            
            return {
                'test_type': 't-test',
                'statistic': float(statistic),
                'p_value': float(p_value),
                'significant': p_value < 0.05,
                'conclusion': '兩組有顯著差異' if p_value < 0.05 else '兩組無顯著差異'
            }
        
        return {"error": "不支援的檢定類型"}

3. AI 分析引擎 (core/analysis_engine.py)

import google.generativeai as genai
import os
from typing import Dict, Any
import json

genai.configure(api_key=os.getenv('GEMINI_API_KEY'))

class AIAnalysisEngine:
    """AI 分析引擎"""
    
    def __init__(self):
        self.model = genai.GenerativeModel('gemini-2.5-flash')
    
    def interpret_query(self, query: str, data_info: Dict[str, Any]) -> Dict[str, Any]:
        """解析自然語言查詢"""
        prompt = f"""
        使用者想要分析以下資料,請解析他的查詢意圖:
        
        資料資訊:
        - 欄位:{', '.join(data_info.get('column_names', []))}
        - 數值欄位:{', '.join(data_info.get('numeric_columns', []))}
        - 類別欄位:{', '.join(data_info.get('categorical_columns', []))}
        
        使用者查詢:{query}
        
        請以 JSON 格式回應:
        {{
            "analysis_type": "descriptive | correlation | comparison | trend | prediction",
            "target_columns": ["欄位名稱"],
            "parameters": {{}},
            "explanation": "分析說明"
        }}
        
        只回傳 JSON,不要其他文字。
        """
        
        try:
            response = self.model.generate_content(prompt)
            result = json.loads(response.text)
            print(f"🤖 AI 理解:{result.get('explanation', '未知')}")
            return result
        except Exception as e:
            print(f"⚠️ 查詢解析失敗:{e}")
            return {
                "analysis_type": "descriptive",
                "target_columns": data_info.get('numeric_columns', [])[:1],
                "parameters": {},
                "explanation": "使用預設分析"
            }
    
    def generate_insights(self, analysis_results: Dict[str, Any], 
                         context: str = "") -> str:
        """生成資料洞察"""
        prompt = f"""
        請根據以下資料分析結果,生成專業的洞察報告:
        
        {context}
        
        分析結果:
        {json.dumps(analysis_results, ensure_ascii=False, indent=2)}
        
        請提供:
        1. 主要發現(3-5 點)
        2. 數據特徵解讀
        3. 可能的業務含義
        4. 建議的後續行動
        
        使用清晰、專業的語言。
        """
        
        try:
            response = self.model.generate_content(prompt)
            return response.text
        except Exception as e:
            return f"洞察生成失敗:{str(e)}"
    
    def suggest_visualization(self, analysis_type: str, 
                            columns: list) -> Dict[str, str]:
        """建議視覺化方式"""
        suggestions = {
            'descriptive': {
                'chart_type': 'histogram',
                'reason': '直方圖適合展示數值資料的分佈情況'
            },
            'correlation': {
                'chart_type': 'heatmap',
                'reason': '熱力圖能清楚展示變數間的相關性'
            },
            'comparison': {
                'chart_type': 'bar_chart',
                'reason': '長條圖適合比較不同組別的數值'
            },
            'trend': {
                'chart_type': 'line_chart',
                'reason': '折線圖適合展示時間序列趨勢'
            }
        }
        
        return suggestions.get(analysis_type, {
            'chart_type': 'scatter',
            'reason': '散佈圖適合探索資料關係'
        })

4. 報告生成器 (utils/report_generator.py)

from typing import Dict, Any
from datetime import datetime

class ReportGenerator:
    """報告生成器"""
    
    def generate_report(self, analysis_results: Dict[str, Any],
                       insights: str, data_info: Dict[str, Any]) -> str:
        """生成分析報告"""
        
        report = f"""
╔══════════════════════════════════════════════════════════════╗
║                      資料分析報告                            ║
╚══════════════════════════════════════════════════════════════╝

📅 報告生成時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

【資料概覽】
─────────────────────────────────────────────────────────────
📊 資料規模:{data_info.get('rows', 0)} 行 × {data_info.get('columns', 0)} 列
📋 欄位清單:{', '.join(data_info.get('column_names', []))}
🔢 數值欄位:{len(data_info.get('numeric_columns', []))} 個
📝 類別欄位:{len(data_info.get('categorical_columns', []))} 個

【分析結果】
─────────────────────────────────────────────────────────────
"""
        
        # 添加統計資訊
        if 'mean' in analysis_results:
            report += f"""
統計摘要:
• 平均值:{analysis_results['mean']:.2f}
• 中位數:{analysis_results['median']:.2f}
• 標準差:{analysis_results['std']:.2f}
• 最小值:{analysis_results.get('min', 'N/A')}
• 最大值:{analysis_results.get('max', 'N/A')}
"""
        
        # 添加異常值資訊
        if 'outliers' in analysis_results:
            outliers = analysis_results['outliers']
            report += f"""
異常值檢測:
• 異常值數量:{outliers['count']}
• 異常值比例:{outliers['percentage']:.2f}%
"""
        
        # 添加相關性分析
        if 'strong_correlations' in analysis_results:
            report += "\n強相關性配對:\n"
            for corr in analysis_results['strong_correlations'][:5]:
                report += f"• {corr['column1']} ↔ {corr['column2']}: {corr['correlation']:.3f} ({corr['strength']})\n"
        
        # 添加 AI 洞察
        report += f"""
【AI 洞察分析】
─────────────────────────────────────────────────────────────
{insights}

【建議】
─────────────────────────────────────────────────────────────
✓ 定期監控關鍵指標變化
✓ 深入調查異常資料點
✓ 考慮進行更詳細的分組分析
✓ 建立資料監控儀表板

╚══════════════════════════════════════════════════════════════╝
"""
        
        return report

5. 主程式 (main.py)

import pandas as pd
import numpy as np
from core.data_processor import DataProcessor
from core.analysis_engine import AIAnalysisEngine
from analyzers.statistical_analyzer import StatisticalAnalyzer
from utils.report_generator import ReportGenerator

def create_sample_data():
    """創建範例資料"""
    np.random.seed(42)
    
    data = {
        '銷售額': np.random.normal(10000, 2000, 100),
        '訪客數': np.random.normal(500, 100, 100),
        '轉換率': np.random.uniform(0.01, 0.05, 100),
        '產品類別': np.random.choice(['A', 'B', 'C'], 100),
        '地區': np.random.choice(['北部', '中部', '南部'], 100)
    }
    
    return pd.DataFrame(data)

def main():
    """資料分析助手主程式"""
    print("📊 AI 資料分析助手")
    print("🤖 支援自然語言查詢、智能分析、自動報告生成")
    print("=" * 60)
    
    # 初始化組件
    processor = DataProcessor()
    ai_engine = AIAnalysisEngine()
    stat_analyzer = StatisticalAnalyzer()
    report_gen = ReportGenerator()
    
    # 載入範例資料
    print("\n📂 載入範例資料...")
    sample_data = create_sample_data()
    data_info = processor.load_data(sample_data)
    
    print(f"\n✅ 資料載入成功!")
    print(f"📋 可用欄位:{', '.join(data_info['column_names'])}")
    
    while True:
        print("\n" + "─" * 60)
        print("選擇操作:")
        print("1. 💬 自然語言查詢")
        print("2. 📊 基本統計分析")
        print("3. 🔗 相關性分析")
        print("4. 📈 分佈分析")
        print("5. 📑 生成完整報告")
        print("6. 🚪 退出")
        
        try:
            choice = input("\n請選擇 (1-6):").strip()
            
            if choice == '1':
                query = input("\n請用自然語言描述您想要的分析:").strip()
                
                if query:
                    print("\n🤖 分析查詢中...")
                    
                    # AI 解析查詢
                    intent = ai_engine.interpret_query(query, data_info)
                    
                    # 執行分析
                    if intent['analysis_type'] == 'descriptive':
                        target_col = intent['target_columns'][0] if intent['target_columns'] else data_info['numeric_columns'][0]
                        
                        stats = processor.get_basic_stats(target_col)
                        
                        print(f"\n📊 '{target_col}' 的統計摘要:")
                        for key, value in stats.items():
                            if key not in ['column', 'type']:
                                print(f"  • {key}: {value}")
                        
                        # 生成洞察
                        insights = ai_engine.generate_insights(stats, f"欄位:{target_col}")
                        print(f"\n💡 AI 洞察:\n{insights}")
                    
                    elif intent['analysis_type'] == 'correlation':
                        corr_results = stat_analyzer.correlation_analysis(processor.data)
                        
                        print("\n🔗 相關性分析結果:")
                        if corr_results.get('strong_correlations'):
                            for corr in corr_results['strong_correlations'][:5]:
                                print(f"  • {corr['column1']} ↔ {corr['column2']}: {corr['correlation']:.3f}")
                        else:
                            print("  未發現強相關性")
            
            elif choice == '2':
                print(f"\n可用欄位:{', '.join(data_info['numeric_columns'])}")
                column = input("請選擇要分析的欄位:").strip()
                
                if column in processor.data.columns:
                    stats = processor.get_basic_stats(column)
                    
                    print(f"\n📊 '{column}' 的統計資訊:")
                    for key, value in stats.items():
                        if key not in ['column', 'type']:
                            print(f"  • {key}: {value}")
                else:
                    print("❌ 欄位不存在")
            
            elif choice == '3':
                print("\n🔗 執行相關性分析...")
                corr_results = stat_analyzer.correlation_analysis(processor.data)
                
                if corr_results.get('strong_correlations'):
                    print("\n強相關性配對:")
                    for corr in corr_results['strong_correlations']:
                        symbol = "📈" if corr['correlation'] > 0 else "📉"
                        print(f"  {symbol} {corr['column1']} ↔ {corr['column2']}: {corr['correlation']:.3f} ({corr['strength']})")
                else:
                    print("  未發現顯著的相關性")
            
            elif choice == '4':
                print(f"\n可用欄位:{', '.join(data_info['numeric_columns'])}")
                column = input("請選擇要分析的欄位:").strip()
                
                if column in processor.data.columns:
                    print(f"\n📈 分析 '{column}' 的分佈...")
                    
                    dist_analysis = stat_analyzer.analyze_distribution(processor.data[column])
                    
                    print(f"\n分佈特徵:")
                    print(f"  • 平均值:{dist_analysis['mean']:.2f}")
                    print(f"  • 中位數:{dist_analysis['median']:.2f}")
                    print(f"  • 標準差:{dist_analysis['std']:.2f}")
                    print(f"  • 偏度:{dist_analysis['skewness']:.2f}")
                    print(f"  • 峰度:{dist_analysis['kurtosis']:.2f}")
                    
                    if dist_analysis['outliers']['count'] > 0:
                        print(f"\n⚠️  發現 {dist_analysis['outliers']['count']} 個異常值 ({dist_analysis['outliers']['percentage']:.1f}%)")
                else:
                    print("❌ 欄位不存在")
            
            elif choice == '5':
                print("\n📑 生成完整分析報告...")
                
                # 執行完整分析
                column = data_info['numeric_columns'][0]
                dist_analysis = stat_analyzer.analyze_distribution(processor.data[column])
                corr_results = stat_analyzer.correlation_analysis(processor.data)
                
                # 合併結果
                combined_results = {**dist_analysis, **corr_results}
                
                # 生成 AI 洞察
                insights = ai_engine.generate_insights(combined_results, "完整資料分析")
                
                # 生成報告
                report = report_gen.generate_report(combined_results, insights, data_info)
                
                print(report)
                
                # 詢問是否儲存
                save = input("\n是否儲存報告?(y/n):").strip().lower()
                if save == 'y':
                    filename = input("檔案名稱 (預設: report.txt):").strip() or "report.txt"
                    with open(filename, 'w', encoding='utf-8') as f:
                        f.write(report)
                    print(f"✅ 報告已儲存到 {filename}")
            
            elif choice == '6':
                print("\n👋 感謝使用!再見!")
                break
            
            else:
                print("❌ 無效的選擇")
        
        except KeyboardInterrupt:
            print("\n\n👋 再見!")
            break
        except Exception as e:
            print(f"\n❌ 發生錯誤:{e}")

if __name__ == "__main__":
    main()

🎯 系統特色

自然語言查詢:用日常語言進行資料分析
智能分析引擎:AI 自動選擇合適的分析方法
多維度分析:統計、相關性、分佈、趨勢等
自動洞察生成:AI 解讀分析結果並提供建議
專業報告輸出:格式化的分析報告

🚀 使用示範

📊 AI 資料分析助手
🤖 支援自然語言查詢、智能分析、自動報告生成

📂 載入範例資料...
✅ 資料已載入:100 行 × 5 列
📋 可用欄位:銷售額, 訪客數, 轉換率, 產品類別, 地區

選擇操作:
1. 💬 自然語言查詢

請用自然語言描述您想要的分析:
分析銷售額的平均值和分佈情況

🤖 分析查詢中...
🤖 AI 理解:分析銷售額欄位的描述性統計

📊 '銷售額' 的統計摘要:
  • count: 100
  • mean: 10023.45
  • median: 9987.32
  • std: 2001.78
  • min: 5432.10
  • max: 15678.90

💡 AI 洞察:
根據分析結果,銷售額資料呈現正態分佈特徵...
建議關注異常值並深入分析高低銷售額的原因。

今天我們打造了功能完整的 AI 資料分析助手,讓資料分析變得更簡單直覺。明天我們將學習程式碼生成助手開發,探索 AI 在程式設計領域的應用!


上一篇
Day 18: 人機協作介面設計
下一篇
Day 20: 程式碼生成助手開發
系列文
30 天從零到 AI 助理:Gemini CLI 與 LangGraph 輕鬆上手21
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言